#include "kafka_client.h"
#include <cstdarg>
namespace KafkaClient {
	/**
	 * @brief Message delivery report callback.
	 *
	 * This callback is called exactly once per message, indicating if
	 * the message was succesfully delivered
	 * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently
	 * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR).
	 *
	 * The callback is triggered from rd_kafka_poll() and executes on
	 * the application's thread.
	 */
	void
	dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
		if (rkmessage->err)
			cbprintf( "%% Message delivery failed: %s\n",
					rd_kafka_err2str(rkmessage->err));
		else
			cbprintf(
					"%% Message delivered (%zd bytes, "
					"partition %" PRId32 ")\n",
					rkmessage->len, rkmessage->partition);

		/* The rkmessage is destroyed automatically by librdkafka */
	}



	kafka_producer::kafka_producer(const std::string & sbrokers,const std::string & stopic)
		:m_topic(stopic)
		,m_brokers(sbrokers)
	{
		init();
	}
	kafka_producer::~kafka_producer()
	{
		exit();
	}

	bool kafka_producer::init()
	{
		if (rk)
			return true;
		rd_kafka_conf_t *conf; /* Temporary configuration object */
		char errstr[512];      /* librdkafka API error reporting buffer */
		const char *brokers = m_brokers.c_str();   /* Argument: broker list */
		/*
		 * Create Kafka client configuration place-holder
		 */
		conf = rd_kafka_conf_new();
		/* Set bootstrap broker(s) as a comma-separated list of
		 * host or host:port (default port 9092).
		 * librdkafka will use the bootstrap brokers to acquire the full
		 * set of brokers from the cluster. */
		if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
							  sizeof(errstr)) != RD_KAFKA_CONF_OK) {
			cbprintf( "%s\n", errstr);
			rd_kafka_conf_destroy(conf);
			return false;
		}
		if (rd_kafka_conf_set(conf, "compression.type", "zstd", errstr,
							  sizeof(errstr)) != RD_KAFKA_CONF_OK) {
			cbprintf( "%s\n", errstr);
			rd_kafka_conf_destroy(conf);
			return false;
		}
		if (rd_kafka_conf_set(conf, "compression.level", "9", errstr,
							  sizeof(errstr)) != RD_KAFKA_CONF_OK) {
			cbprintf( "%s\n", errstr);
			rd_kafka_conf_destroy(conf);
			return false;
		}

		/* Set the delivery report callback.
		 * This callback will be called once per message to inform
		 * the application if delivery succeeded or failed.
		 * See dr_msg_cb() above.
		 * The callback is only triggered from rd_kafka_poll() and
		 * rd_kafka_flush(). */
		rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

		/*
		 * Create producer instance.
		 *
		 * NOTE: rd_kafka_new() takes ownership of the conf object
		 *       and the application must not reference it again after
		 *       this call.
		 */
		rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
		if (!rk) {
			cbprintf( "%% Failed to create new producer: %s\n",
					errstr);
			return false;
		}
		conf = NULL; /* Configuration object is now owned, and freed,
					  * by the rd_kafka_t instance. */

		return true;

	}
	bool kafka_producer::exit()
	{
		if (!rk)
			return true;
		/* Wait for final messages to be delivered or fail.
		 * rd_kafka_flush() is an abstraction over rd_kafka_poll() which
		 * waits for all messages to be delivered. */
		cbprintf( "%% Flushing final messages..\n");
		rd_kafka_flush(rk, 10 * 1000 /* wait for max 10 seconds */);

		/* If the output queue is still not empty there is an issue
		 * with producing messages to the clusters. */
		if (rd_kafka_outq_len(rk) > 0)
			cbprintf( "%% %d message(s) were not delivered\n",
					rd_kafka_outq_len(rk));

		/* Destroy the producer instance */
		rd_kafka_destroy(rk);

		rk = nullptr;	if (!rk)
			return true;

		return true;
	}

	bool kafka_producer::write(
			const char * data,
			const int len,
			const int maxTry /*=10*/,
			const char * key /*= nullptr*/,
			const int keylen /*= 0*/)
	{
		const char *topic = m_topic.c_str();     /* Argument: topic to produce to */
		rd_kafka_resp_err_t err;

		if (len == 0) {
			/* Empty line: only serve delivery reports */
			rd_kafka_poll(rk, 0 /*non-blocking */);
			return true;
		}
		/*
		 * Send/Produce message.
		 * This is an asynchronous call, on success it will only
		 * enqueue the message on the internal producer queue.
		 * The actual delivery attempts to the broker are handled
		 * by background threads.
		 * The previously registered delivery report callback
		 * (dr_msg_cb) is used to signal back to the application
		 * when the message has been delivered (or failed).
		 */
		int totalTry = 0;
		bool succeed = false;
		while(!succeed && totalTry++ < maxTry)
		{
			err = rd_kafka_producev(
						/* Producer handle */
						rk,
						/* Topic name */
						RD_KAFKA_V_TOPIC(topic),
						/* Make a copy of the payload. */
						RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
						/* Message value and length */
						RD_KAFKA_V_VALUE((void *)data, len),
						/* Per-Message opaque, provided in
							 * delivery report callback as
							 * msg_opaque. */
						RD_KAFKA_V_KEY((void *)key,keylen),
						RD_KAFKA_V_OPAQUE(NULL),
						/* End sentinel */
						RD_KAFKA_V_END);

			if (err)
			{
				/*
					 * Failed to *enqueue* message for producing.
					 */
				cbprintf(
						"%% Failed to produce to topic %s: %s\n", topic,
						rd_kafka_err2str(err));
				rd_kafka_poll(rk,
							  1000 /*block for max 1000ms*/);
			}
			else
			{
				cbprintf(
						"%% Enqueued message (%d bytes) "
						"for topic %s\n",
						(int)len, topic);
				succeed = true;
				rd_kafka_poll(rk, 0 /*non-blocking */);
			}
		}

		return succeed;
	}


	kafka_consumer::kafka_consumer(
			const std::string & brokers,
			const std::vector<std::string> topics,
			const std::string & group)
		:m_stop(false)
		,m_topics(topics)
		,m_brokers(brokers)
		,m_group(group)

	{
		init();
	}
	kafka_consumer::~kafka_consumer()
	{
		exit();
	}
	bool kafka_consumer::init()
	{
		char errstr[512];        /* librdkafka API error reporting buffer */
		rd_kafka_resp_err_t err; /* librdkafka API error code */

		/*
		 * Create Kafka client configuration place-holder
		 */
		rd_kafka_conf_t *conf  = rd_kafka_conf_new();
		/* Set bootstrap broker(s) as a comma-separated list of
		 * host or host:port (default port 9092).
		 * librdkafka will use the bootstrap brokers to acquire the full
		 * set of brokers from the cluster. */
		if (rd_kafka_conf_set(conf, "bootstrap.servers", m_brokers.c_str()
							  , errstr,
							  sizeof(errstr)) != RD_KAFKA_CONF_OK) {
			cbprintf( "%s\n", errstr);
			rd_kafka_conf_destroy(conf);
			return false;
		}
		/* Set the consumer group id.
		 * All consumers sharing the same group id will join the same
		 * group, and the subscribed topic' partitions will be assigned
		 * according to the partition.assignment.strategy
		 * (consumer config property) to the consumers in the group. */
		if (rd_kafka_conf_set(conf, "group.id", m_group.c_str(), errstr,
							  sizeof(errstr)) != RD_KAFKA_CONF_OK) {
			cbprintf( "%s\n", errstr);
			rd_kafka_conf_destroy(conf);
			return false;
		}
		/* If there is no previously committed offset for a partition
		 * the auto.offset.reset strategy will be used to decide where
		 * in the partition to start fetching messages.
		 * By setting this to earliest the consumer will read all messages
		 * in the partition if there was no previously committed offset. */
		if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr,
							  sizeof(errstr)) != RD_KAFKA_CONF_OK) {
			cbprintf( "%s\n", errstr);
			rd_kafka_conf_destroy(conf);
			return false;
		}
		/*
		 * Create consumer instance.
		 *
		 * NOTE: rd_kafka_new() takes ownership of the conf object
		 *       and the application must not reference it again after
		 *       this call.
		 */
		rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
		if (!rk) {
			cbprintf( "%% Failed to create new consumer: %s\n",
					errstr);
			return false;
		}
		conf = NULL; /* Configuration object is now owned, and freed,
					  * by the rd_kafka_t instance. */
		/* Redirect all messages from per-partition queues to
		 * the main queue so that messages can be consumed with one
		 * call from all assigned partitions.
		 *
		 * The alternative is to poll the main queue (for events)
		 * and each partition queue separately, which requires setting
		 * up a rebalance callback and keeping track of the assignment:
		 * but that is more complex and typically not recommended. */
		rd_kafka_poll_set_consumer(rk);

		int topic_cnt = m_topics.size();

		rd_kafka_topic_partition_list_t *subscription
				= rd_kafka_topic_partition_list_new(topic_cnt);; /* Subscribed topics */
		/* Convert the list of topics to a format suitable for librdkafka */
		for (int i = 0; i < topic_cnt; i++)
			rd_kafka_topic_partition_list_add(subscription, m_topics[i].c_str(),
											  /* the partition is ignored
																					 * by subscribe() */
											  RD_KAFKA_PARTITION_UA);

		/* Subscribe to the list of topics */
		err = rd_kafka_subscribe(rk, subscription);
		if (err) {
			cbprintf( "%% Failed to subscribe to %d topics: %s\n",
					subscription->cnt, rd_kafka_err2str(err));
			rd_kafka_topic_partition_list_destroy(subscription);
			rd_kafka_destroy(rk);
			rk = 0;
			return false;
		}

		cbprintf(
				"%% Subscribed to %d topic(s), "
				"waiting for rebalance and messages...\n",
				subscription->cnt);

		rd_kafka_topic_partition_list_destroy(subscription);

		return true;
	}
	bool kafka_consumer::exit()
	{
		if (!rk)
			return true;
		/* Close the consumer: commit final offsets and leave the group. */
		cbprintf( "%% Closing consumer\n");
		rd_kafka_consumer_close(rk);


		/* Destroy the consumer */
		rd_kafka_destroy(rk);

		rk = 0;

		return true;
	}
	bool kafka_consumer::run(std::function<void (rd_kafka_message_t *)> cb )
	{
		m_stop = true;
		if (!rk)
			return false;
		m_stop = false;
		while (!m_stop) {
			rd_kafka_message_t *rkm;

			rkm = rd_kafka_consumer_poll(rk, 100);
			if (!rkm)
				continue; /* Timeout: no message within 100ms,
								   *  try again. This short timeout allows
								   *  checking for `run` at frequent intervals.
								   */

			/* consumer_poll() will return either a proper message
				 * or a consumer error (rkm->err is set). */
			if (rkm->err) {
				/* Consumer errors are generally to be considered
						 * informational as the consumer will automatically
						 * try to recover from all types of errors. */
				cbprintf( "%% Consumer error: %s\n",
						rd_kafka_message_errstr(rkm));
				rd_kafka_message_destroy(rkm);
				continue;
			}
			if (cb)
				cb(rkm);

			rd_kafka_message_destroy(rkm);
		}
		cbprintf( "%% Consumer stopped\n");
		return true;
	}

	bool kafka_consumer::stop()
	{
		m_stop = true;
		return true;
	}


	void csprintf (const char * format,...)
	{
		va_list args;
		va_start(args, format);
		vfprintf(stderr,format, args);
		va_end(args);
	}
	void (*cbprintf) (const char *,...) = csprintf;

}
